#define PY_SSIZE_T_CLEAN
#include "Python.h"

#ifndef PYTHON_SDK
#include "box/box_channel.hh"
#include "detail/rpc_probe_impl.hh"
#include "detail/service_layer.hh"
#include "detail/zookeeper/service_finder_zookeeper.hh"
#include "detail/zookeeper/service_register_zookeeper.hh"
#endif // !PYTHON_SDK

#include "config/box_config.hh"
#include "python_helper.hh"
#include <vector>

kratos::util::CppRuntime g_runtime;

static auto get_buffer_ptr() -> kratos::util::CppRuntime * {
  return &g_runtime;
}

static auto init(PyObject * /*self*/, PyObject *args) -> PyObject * {
  char *config_path_dir{nullptr};
  PyObject *log_cb_obj{nullptr};
  if (!PyArg_ParseTuple(args, "sO", &config_path_dir, &log_cb_obj)) {
    return Py_False;
  }
  if (log_cb_obj != Py_None) {
    get_buffer_ptr()->set_log_cb(log_cb_obj);
  }
#ifndef PYTHON_SDK
  if (!get_buffer_ptr()->init(config_path_dir)) {
    Py_RETURN_FALSE;
  } else {
    Py_RETURN_TRUE;
  }
#else
  if (!get_buffer_ptr()->init_sdk(config_path_dir)) {
    Py_RETURN_FALSE;
  } else {
    Py_RETURN_TRUE;
  }
#endif // PYTHON_SDK
}

static auto pop(PyObject * /*self*/, PyObject * /*args*/) -> PyObject * {
  return get_buffer_ptr()->pop();
}

static auto call(PyObject * /*self*/, PyObject *args) -> PyObject * {
  std::uint64_t channel_id{0};
  rpc::CallID call_id{0};
  rpc::ServiceUUID uuid{0};
  rpc::MethodID method_id{0};
  rpc::ServiceID service_id{0};
  int timeout{0};
  PyObject *py_obj{nullptr};
  if (PyTuple_Size(args) != 7) {
    Py_RETURN_FALSE;
  }
  if (!PyArg_ParseTuple(args, "KIKIIiO", &channel_id, &call_id, &uuid,
                        &method_id, &service_id, &timeout, &py_obj)) {
    Py_RETURN_FALSE;
  }
  if (!call_id || !uuid || !method_id || !channel_id || !py_obj) {
    Py_RETURN_FALSE;
  }
  get_buffer_ptr()->reg_proxy_call(call_id, uuid, method_id, service_id,
                                   timeout);
  auto retval = get_buffer_ptr()->call(channel_id, uuid, service_id, call_id,
                                       method_id, py_obj);
  if (retval) {
    Py_RETURN_TRUE;
  } else {
    Py_RETURN_FALSE;
  }
}

static auto call_proxy(PyObject * /*self*/, PyObject *args) -> PyObject * {
  std::uint64_t channel_id{0};
  rpc::CallID call_id{0};
  rpc::ServiceUUID uuid{0};
  rpc::MethodID method_id{0};
  rpc::ServiceID service_id{0};
  rpc::GlobalIndex global_index{0};
  int oneway{0};
  int timeout{0};
  PyObject *py_obj{nullptr};
  if (PyTuple_Size(args) != 9) {
    Py_RETURN_FALSE;
  }
  if (!PyArg_ParseTuple(args, "KIKIIIiiO", &channel_id, &call_id, &uuid,
                        &method_id, &service_id, &global_index, &oneway,
                        &timeout, &py_obj)) {
    Py_RETURN_FALSE;
  }
  if (!call_id || !uuid || !method_id || !global_index || !channel_id ||
      !py_obj) {
    Py_RETURN_FALSE;
  }
  get_buffer_ptr()->reg_proxy_call(call_id, uuid, method_id, service_id,
                                   timeout);
  auto retval = get_buffer_ptr()->call_proxy(channel_id, uuid, service_id,
                                             call_id, method_id, global_index,
                                             (std::uint16_t)oneway, py_obj);
  if (retval) {
    Py_RETURN_TRUE;
  } else {
    Py_RETURN_FALSE;
  }
}

static auto call_ret(PyObject * /*self*/, PyObject *args) -> PyObject * {
  std::uint64_t channel_id{0};
  rpc::CallID call_id{0};
  rpc::ServiceUUID uuid{0};
  rpc::MethodID method_id{0};
  rpc::ServiceID service_id{0};
  rpc::ErrorID error_id{0};
  PyObject *py_obj{nullptr};
  if (PyTuple_Size(args) != 7) {
    Py_RETURN_FALSE;
  }
  if (!PyArg_ParseTuple(args, "KIKIIIO", &channel_id, &call_id, &uuid,
                        &method_id, &service_id, &error_id, &py_obj)) {
    Py_RETURN_FALSE;
  }
  if (!call_id || !uuid || !method_id || !channel_id || !py_obj) {
    Py_RETURN_FALSE;
  }
  auto retval = get_buffer_ptr()->call_ret(
      channel_id, uuid, method_id, service_id, call_id, error_id, py_obj);
  if (retval) {
    Py_RETURN_TRUE;
  } else {
    Py_RETURN_FALSE;
  }
}

static auto call_proxy_ret(PyObject * /*self*/, PyObject *args) -> PyObject * {
  std::uint64_t channel_id{0};
  rpc::CallID call_id{0};
  rpc::ServiceUUID uuid{0};
  rpc::MethodID method_id{0};
  rpc::ServiceID service_id{0};
  rpc::ErrorID error_id{0};
  rpc::GlobalIndex global_index{0};
  PyObject *py_obj{nullptr};
  if (PyTuple_Size(args) != 8) {
    Py_RETURN_FALSE;
  }
  if (!PyArg_ParseTuple(args, "KIKIIIIO", &channel_id, &call_id, &uuid,
                        &method_id, &service_id, &error_id, &global_index,
                        &py_obj)) {
    Py_RETURN_FALSE;
  }
  if (!call_id || !uuid || !method_id || !global_index || !channel_id ||
      !py_obj) {
    Py_RETURN_FALSE;
  }
  auto retval =
      get_buffer_ptr()->call_proxy_ret(channel_id, uuid, method_id, service_id,
                                       call_id, error_id, global_index, py_obj);
  if (retval) {
    Py_RETURN_TRUE;
  } else {
    Py_RETURN_FALSE;
  }
}

static auto subscribe(PyObject * /*self*/, PyObject *args) -> PyObject * {
  std::uint64_t channel_id{0};
  char *sub_id{nullptr};
  rpc::ProxyID proxy_id{0};
  rpc::ServiceUUID uuid{0};
  rpc::MethodID method_id{0};
  rpc::ServiceID service_id{0};
  char *evt_name{nullptr};
  PyObject *py_obj{nullptr};
  if (PyTuple_Size(args) != 8) {
    Py_RETURN_FALSE;
  }
  if (!PyArg_ParseTuple(args, "KsIKIIsO", &channel_id, &sub_id, &proxy_id,
                        &uuid, &method_id, &service_id, &evt_name, &py_obj)) {
    Py_RETURN_FALSE;
  }
  if (!proxy_id || !uuid || !sub_id || !channel_id || !evt_name || !py_obj) {
    Py_RETURN_FALSE;
  }
  auto retval =
      get_buffer_ptr()->subscribe(channel_id, sub_id, proxy_id, uuid, method_id,
                                  service_id, evt_name, py_obj);
  if (retval) {
    Py_RETURN_TRUE;
  } else {
    Py_RETURN_FALSE;
  }
}

static auto cancel(PyObject * /*self*/, PyObject *args) -> PyObject * {
  std::uint64_t channel_id{0};
  char *sub_id{nullptr};
  if (PyTuple_Size(args) != 2) {
    Py_RETURN_FALSE;
  }
  if (!PyArg_ParseTuple(args, "Ks", &channel_id, &sub_id)) {
    Py_RETURN_FALSE;
  }
  if (!sub_id || !channel_id) {
    Py_RETURN_FALSE;
  }
  get_buffer_ptr()->cancel(channel_id, sub_id);
  Py_RETURN_TRUE;
}

static auto publish(PyObject * /*self*/, PyObject *args) -> PyObject * {
  std::uint64_t channel_id{0};
  char *sub_id{nullptr};
  rpc::ProxyID proxy_id{0};
  PyObject *py_obj{nullptr};
  if (PyTuple_Size(args) != 4) {
    Py_RETURN_FALSE;
  }
  if (!PyArg_ParseTuple(args, "KsIO", &channel_id, &sub_id, &proxy_id,
                        &py_obj)) {
    Py_RETURN_FALSE;
  }
  if (!sub_id || !channel_id || !proxy_id || !py_obj) {
    Py_RETURN_FALSE;
  }
  get_buffer_ptr()->publish(channel_id, sub_id, proxy_id, py_obj);
  Py_RETURN_TRUE;
}

static auto GetServiceChannel(PyObject * /*self*/, PyObject *args)
    -> PyObject * {
  // ARGS service name
  char *service_name{nullptr};
  if (!PyArg_ParseTuple(args, "s", &service_name)) {
    return PyLong_FromUnsignedLongLong(0);
  }
  if (!service_name) {
    return PyLong_FromUnsignedLongLong(0);
  }
#ifndef PYTHON_SDK
  auto *layer_ptr = get_buffer_ptr()->get_service_layer();
  if (!layer_ptr) {
    return PyLong_FromUnsignedLongLong(0);
  }
  return PyLong_FromUnsignedLongLong(layer_ptr->get_channel(service_name));
#else
  return PyLong_FromUnsignedLongLong(
      get_buffer_ptr()->get_channel(service_name));
#endif // PYTHON_SDK
}

static auto RegisterService(PyObject * /*self*/, PyObject *args) -> PyObject * {
  // ARGS service name
  char *service_name{nullptr};
  if (!PyArg_ParseTuple(args, "s", &service_name)) {
    Py_RETURN_FALSE;
  }
  if (!service_name) {
    Py_RETURN_FALSE;
  }
  auto retval = get_buffer_ptr()->register_service(service_name);
  if (retval) {
    Py_RETURN_TRUE;
  } else {
    Py_RETURN_FALSE;
  }
}

static auto UnregisterService(PyObject * /*self*/, PyObject *args)
    -> PyObject * {
  // ARGS service name
  char *service_name{nullptr};
  if (!PyArg_ParseTuple(args, "s", &service_name)) {
    Py_RETURN_FALSE;
  }
  if (!service_name) {
    Py_RETURN_FALSE;
  }
  auto retval = get_buffer_ptr()->unregister_service(service_name);
  if (retval) {
    Py_RETURN_TRUE;
  } else {
    Py_RETURN_FALSE;
  }
}

static auto deinit(PyObject * /*self*/, PyObject * /*args*/) -> PyObject * {
  get_buffer_ptr()->deinit();
  Py_RETURN_NONE;
}

static auto CheckChannel(PyObject * /*self*/, PyObject *args) -> PyObject * {
  char *service_name{nullptr};
  std::uint64_t channel_id{0};
  if (!PyArg_ParseTuple(args, "sK", &service_name, &channel_id)) {
    Py_RETURN_FALSE;
  }
  if (!service_name || !channel_id) {
    Py_RETURN_FALSE;
  }
#ifndef PYTHON_SDK
  auto retval = get_buffer_ptr()->get_service_layer()->check_channel(
      service_name, channel_id);
#else
  auto retval = (get_buffer_ptr()->get_channel(service_name) != 0);
#endif // PYTHON_SDK
  if (retval) {
    Py_RETURN_TRUE;
  } else {
    Py_RETURN_FALSE;
  }
}

static auto ReportBadChannel(PyObject * /*self*/, PyObject *args)
    -> PyObject * {
  std::uint64_t channel_id{0};
  if (!PyArg_ParseTuple(args, "K", &channel_id)) {
    Py_RETURN_NONE;
  }
  if (!channel_id) {
    Py_RETURN_NONE;
  }
#ifndef PYTHON_SDK
  get_buffer_ptr()->get_service_layer()->report_bad_channel(channel_id);
#else
  get_buffer_ptr()->close_channel();
#endif // PYTHON_SDK
  Py_RETURN_NONE;
}

static auto NewTimer(PyObject * /*self*/, PyObject *args) -> PyObject * {
  std::uint64_t timeout{0};
  std::uint64_t usr_data{0};
  if (!PyArg_ParseTuple(args, "KK", &timeout, &usr_data)) {
    Py_RETURN_FALSE;
  }
  if (!timeout) {
    Py_RETURN_FALSE;
  }
  return get_buffer_ptr()->new_timer(timeout, usr_data);
}

static auto IsOpenTrace(PyObject * /*self*/, PyObject *args) -> PyObject * {
#ifdef PYTHON_SDK
  Py_RETURN_FALSE;
#else
  if (get_buffer_ptr()->get_config().is_open_trace() &&
      get_buffer_ptr()->get_probe()) {
    Py_RETURN_TRUE;
  }
  Py_RETURN_FALSE;
#endif // !PYTHON_SDK
}

static auto IsOpenTimeout(PyObject * /*self*/, PyObject *args) -> PyObject * {
#ifdef PYTHON_SDK
  Py_RETURN_FALSE;
#else
  if (get_buffer_ptr()->get_config().is_open_rpc_timeout()) {
    Py_RETURN_TRUE;
  }
  Py_RETURN_FALSE;
#endif // !PYTHON_SDK
}

#ifndef PYTHON_SDK
static auto TraceProxyCallSent(PyObject * /*self*/, PyObject *args)
    -> PyObject * {
  if (!get_buffer_ptr()->get_config().is_open_trace()) {
    Py_RETURN_NONE;
  }
  char *trace_id{nullptr};
  std::uint64_t span_id{0};
  std::uint64_t parent_span_id{0};
  std::uint64_t channel_id{0};
  char *target_service_name{nullptr};
  char *target_method_name{nullptr};
  char *args_info{nullptr};
  int oneway{0};
  if (!PyArg_ParseTuple(args, "sKKKsssi", &trace_id, &span_id, &parent_span_id,
                        &channel_id, &target_service_name, &target_method_name,
                        &args_info, &oneway)) {
    Py_RETURN_NONE;
  }
  if (!trace_id || !target_service_name || !target_method_name || !args_info) {
    Py_RETURN_NONE;
  }
  auto channel = std::dynamic_pointer_cast<rpc::Transport>(
      get_buffer_ptr()->get_channel(channel_id));
  get_buffer_ptr()->get_probe()->on_proxy_call_sent(
      trace_id, span_id, parent_span_id, channel, target_service_name,
      target_method_name, args_info, oneway);
  Py_RETURN_NONE;
}

static auto TraceStubCallArrived(PyObject * /*self*/, PyObject *args)
    -> PyObject * {
  if (!get_buffer_ptr()->get_config().is_open_trace()) {
    Py_RETURN_NONE;
  }
  char *trace_id{nullptr};
  std::uint64_t span_id{0};
  std::uint64_t parent_span_id{0};
  std::uint64_t channel_id{0};
  char *target_service_name{nullptr};
  char *target_method_name{nullptr};
  char *retval_info{nullptr};
  if (!PyArg_ParseTuple(args, "sKKKsss", &trace_id, &span_id, &parent_span_id,
                        &channel_id, &target_service_name, &target_method_name,
                        &retval_info)) {
    Py_RETURN_NONE;
  }
  if (!trace_id || !target_service_name || !target_method_name ||
      !retval_info) {
    Py_RETURN_NONE;
  }
  auto channel = std::dynamic_pointer_cast<rpc::Transport>(
      get_buffer_ptr()->get_channel(channel_id));
  get_buffer_ptr()->get_probe()->on_stub_call_arrived(
      trace_id, span_id, parent_span_id, channel, target_service_name,
      target_method_name, retval_info);
  Py_RETURN_NONE;
}

static auto TraceProxyCallReturnArrived(PyObject * /*self*/, PyObject *args)
    -> PyObject * {
  if (!get_buffer_ptr()->get_config().is_open_trace()) {
    Py_RETURN_NONE;
  }
  char *trace_id{nullptr};
  std::uint64_t span_id{0};
  std::uint64_t parent_span_id{0};
  std::uint64_t channel_id{0};
  char *target_service_name{nullptr};
  char *target_method_name{nullptr};
  char *retval_info{nullptr};
  if (!PyArg_ParseTuple(args, "sKKKsss", &trace_id, &span_id, &parent_span_id,
                        &channel_id, &target_service_name, &target_method_name,
                        &retval_info)) {
    Py_RETURN_NONE;
  }
  if (!trace_id || !target_service_name || !target_method_name ||
      !retval_info) {
    Py_RETURN_NONE;
  }
  auto channel = std::dynamic_pointer_cast<rpc::Transport>(
      get_buffer_ptr()->get_channel(channel_id));
  get_buffer_ptr()->get_probe()->on_proxy_call_return_arrived(
      trace_id, span_id, parent_span_id, channel, target_service_name,
      target_method_name, retval_info);
  Py_RETURN_NONE;
}

static auto TraceStubCallReturnSent(PyObject * /*self*/, PyObject *args)
    -> PyObject * {
  if (!get_buffer_ptr()->get_config().is_open_trace()) {
    Py_RETURN_NONE;
  }
  char *trace_id{nullptr};
  std::uint64_t span_id{0};
  std::uint64_t parent_span_id{0};
  std::uint64_t channel_id{0};
  char *target_service_name{nullptr};
  char *target_method_name{nullptr};
  char *retval_info{nullptr};
  if (!PyArg_ParseTuple(args, "sKKKsss", &trace_id, &span_id, &parent_span_id,
                        &channel_id, &target_service_name, &target_method_name,
                        &retval_info)) {
    Py_RETURN_NONE;
  }
  if (!trace_id || !target_service_name || !target_method_name ||
      !retval_info) {
    Py_RETURN_NONE;
  }
  auto channel = std::dynamic_pointer_cast<rpc::Transport>(
      get_buffer_ptr()->get_channel(channel_id));
  get_buffer_ptr()->get_probe()->on_stub_call_return_sent(
      trace_id, span_id, parent_span_id, channel, target_service_name,
      target_method_name, retval_info);
  Py_RETURN_NONE;
}

static auto OnGenTraceID(PyObject * /*self*/, PyObject *args) -> PyObject * {
  if (!get_buffer_ptr()->get_probe()) {
    Py_RETURN_NONE;
  }
  std::string trace_id;
  get_buffer_ptr()->get_probe()->on_gen_uuid(trace_id);
  return PyUnicode_FromString(trace_id.c_str());
}

static auto OnGenUUID(PyObject * /*self*/, PyObject * /*args*/) -> PyObject * {
  if (!get_buffer_ptr()->get_probe()) {
    Py_RETURN_NONE;
  }
  std::uint64_t uuid;
  get_buffer_ptr()->get_probe()->on_gen_uuid(uuid);
  return PyLong_FromUnsignedLongLong(uuid);
}

static auto CreateUUID(PyObject * /*self*/, PyObject * /*args*/) -> PyObject * {
  return get_buffer_ptr()->create_uuid();
}

#endif // !PYTHON_SDK

#ifdef PYTHON_SDK
static auto connect(PyObject * /*self*/, PyObject *args) -> PyObject * {
  PyObject *obj_ptr{nullptr};
  if (!PyArg_ParseTuple(args, "O", &obj_ptr)) {
    Py_RETURN_FALSE;
  }
  PyObject *iter = PyObject_GetIter(obj_ptr);
  if (!iter) {
    Py_RETURN_FALSE;
  }
  std::vector<std::string> hosts;
  while (true) {
    PyObject *next = PyIter_Next(iter);
    if (!next) {
      break;
    }
    if (!PyUnicode_Check(next)) {
      Py_RETURN_FALSE;
    }

#if PY_MAJOR_VERSION >= 3 && PY_MINOR_VERSION >= 3
    const auto *host = PyUnicode_AsUTF8(next);
#else
    const auto *host = PyUnicode_AS_DATA(next);
#endif // PY_MAJOR_VERSION >= 3 && PY_MINOR_VERSION >=3
    if (!host) {
      Py_RETURN_FALSE;
    }
    hosts.push_back(host);
  }
  if (!get_buffer_ptr()->connect_proxy(hosts)) {
    Py_RETURN_FALSE;
  } else {
    Py_RETURN_TRUE;
  }
}
#endif // PYTHON_SDK

static PyMethodDef buffer_methods[] = {
    {"Init", init, METH_VARARGS, "Initialize RPC framework"},
    {"Deinit", deinit, METH_NOARGS, "De-initialize RPC framework"},
#ifdef PYTHON_SDK
    {"Connect", connect, METH_VARARGS, "Connect to cluster"},
#endif // PYTHON_SDK
    {"Pop", pop, METH_NOARGS, "Pop a RPC protocol object from buffer"},
    {"Call", call, METH_VARARGS, "rpc::RpcCallRequestHeader"},
    {"CallProxy", call_proxy, METH_VARARGS, "rpc::RpcProxyCallRetHeader"},
    {"CallRet", call_ret, METH_VARARGS, "rpc::RpcCallRetHeader"},
    {"CallProxyRet", call_proxy_ret, METH_VARARGS, "rpc::RpcProxyRetHeader"},
    {"Subscribe", subscribe, METH_VARARGS, "rpc::RpcSubHeader"},
    {"Cancel", cancel, METH_VARARGS, "rpc::RpcCancelSubHeader"},
    {"Publish", publish, METH_VARARGS, "rpc::RpcPubHeader"},
    {"GetServiceChannel", GetServiceChannel, METH_VARARGS,
     "Get channel ID of host which hosts service via service name"},
    {"CheckChannel", CheckChannel, METH_VARARGS,
     "Check whether channel is valid"},
    {"RegisterService", RegisterService, METH_VARARGS,
     "Register service name and host to cluster"},
    {"UnregisterService", UnregisterService, METH_VARARGS,
     "Un-register service name and host from cluster"},
    {"ReportBadChannel", ReportBadChannel, METH_VARARGS,
     "Report to service layer which channel is bad"},
    {"NewTimer", NewTimer, METH_VARARGS,
     "Fire a timer with timeout in millionsecond"},
    {"IsOpenTrace", IsOpenTrace, METH_NOARGS, "Check open_trace flag"},
    {"IsOpenTimeout", IsOpenTimeout, METH_NOARGS,
     "Check open_rpc_timeout flag"},
#ifndef PYTHON_SDK
    {"TraceProxyCallSent", TraceProxyCallSent, METH_VARARGS,
     "Report trace info."},
    {"TraceStubCallArrived", TraceStubCallArrived, METH_VARARGS,
     "Report trace info."},
    {"TraceProxyCallReturnArrived", TraceProxyCallReturnArrived, METH_VARARGS,
     "Report trace info."},
    {"TraceStubCallReturnSent", TraceStubCallReturnSent, METH_VARARGS,
     "Report trace info."},
    {"OnGenTraceID", OnGenTraceID, METH_NOARGS, "Generates trace ID"},
    {"OnGenUUID", OnGenUUID, METH_NOARGS, "Generates UUID"},
    {"CreateUUID", CreateUUID, METH_NOARGS, "Generates UUID string"},
#endif                             // !PYTHON_SDK
    {nullptr, nullptr, 0, nullptr} /* Sentinel */
};

static void rpc_module_cleanup() { g_runtime.deinit(false); }

#if !defined(_MSC_VER)
#if defined(DEBUG) || defined(_DEBUG)
#define INIT_FUNC PyMODINIT_FUNC PyInit_rpc_d(void)
#define MOD_NAME "rpc_d"
#else
#define INIT_FUNC PyMODINIT_FUNC PyInit_rpc(void)
#define MOD_NAME "rpc"
#endif // defined(DEBUG) || defined(_DEBUG)
#else
#define INIT_FUNC PyMODINIT_FUNC PyInit_rpc(void)
#define MOD_NAME "rpc"
#endif // !defined(_MSC_VER)

#if PY_MAJOR_VERSION >= 3 && PY_MINOR_VERSION >= 3

static struct PyModuleDef rpc_module = {PyModuleDef_HEAD_INIT, MOD_NAME,
                                        "RPC Framework", -1, buffer_methods};

INIT_FUNC {
  auto *m = PyModule_Create(&rpc_module);
  if (!m) {
    return nullptr;
  }
  if (Py_AtExit(rpc_module_cleanup) == -1) {
    return nullptr;
  }
  return m;
}

#else

INIT_FUNC {
  if (Py_InitModule(MOD_NAME, buffer_methods)) {
    Py_AtExit(rpc_module_cleanup);
  }
}

#endif // PY_MAJOR_VERSION >= 3
